/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.phoenix.mapreduce;

import java.sql.Array;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.phoenix.schema.types.PBinary;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PDate;
import org.apache.phoenix.schema.types.PVarbinary;
import org.apache.phoenix.util.ColumnInfo;
import org.joda.time.DateTime;

public class PhoenixRecordWritable implements DBWritable {

  private final List<Object> upsertValues = new ArrayList<>();
  private final Map<String, Object> resultMap = new LinkedHashMap<>();
  private List<ColumnInfo> columnMetaDataList;

  /** For serialization; do not use. */
  public PhoenixRecordWritable() {
    this(new ArrayList<ColumnInfo>());
  }

  public PhoenixRecordWritable(List<ColumnInfo> columnMetaDataList) {
    this.columnMetaDataList = columnMetaDataList;
  }

  /**
   * Helper method to create a {@link Array} for a specific {@link PDataType}, and set it on the
   * provided {@code stmt}.
   */
  private static void setArrayInStatement(PreparedStatement stmt, PDataType<?> type, Object[] obj,
    int position) throws SQLException {
    Array sqlArray =
      stmt.getConnection().createArrayOf(PDataType.arrayBaseType(type).getSqlTypeName(), obj);
    stmt.setArray(position, sqlArray);
  }

  private static Object[] primativeArrayToObjectArray(byte[] a) {
    final Byte[] ret = new Byte[a.length];
    for (int i = 0; i < a.length; i++) {
      ret[i] = a[i];
    }
    return ret;
  }

  private static Object[] primativeArrayToObjectArray(short[] a) {
    final Short[] ret = new Short[a.length];
    for (int i = 0; i < a.length; i++) {
      ret[i] = a[i];
    }
    return ret;
  }

  private static Object[] primativeArrayToObjectArray(int[] a) {
    final Integer[] ret = new Integer[a.length];
    for (int i = 0; i < a.length; i++) {
      ret[i] = a[i];
    }
    return ret;
  }

  private static Object[] primativeArrayToObjectArray(float[] a) {
    final Float[] ret = new Float[a.length];
    for (int i = 0; i < a.length; i++) {
      ret[i] = a[i];
    }
    return ret;
  }

  private static Object[] primativeArrayToObjectArray(double[] a) {
    final Double[] ret = new Double[a.length];
    for (int i = 0; i < a.length; i++) {
      ret[i] = a[i];
    }
    return ret;
  }

  private static Object[] primativeArrayToObjectArray(long[] a) {
    final Long[] ret = new Long[a.length];
    for (int i = 0; i < a.length; i++) {
      ret[i] = a[i];
    }
    return ret;
  }

  @Override
  public void write(PreparedStatement statement) throws SQLException {
    // make sure we at least line up in size
    if (upsertValues.size() != columnMetaDataList.size()) {
      throw new UnsupportedOperationException("Provided " + upsertValues.size()
        + " upsert values, but column metadata expects " + columnMetaDataList.size() + " columns.");
    }

    // correlate each value (v) to a column type (c) and an index (i)
    for (int i = 0; i < upsertValues.size(); i++) {
      Object v = upsertValues.get(i);
      ColumnInfo c = columnMetaDataList.get(i);

      if (v == null) {
        statement.setNull(i + 1, c.getSqlType());
        continue;
      }

      // both Java and Joda dates used to work in 4.2.3, but now they must be java.sql.Date
      // can override any other types here as needed
      final Object finalObj;
      final PDataType<?> finalType;
      if (v instanceof DateTime) {
        finalObj = new java.sql.Date(((DateTime) v).getMillis());
        finalType = PDate.INSTANCE;
      } else if (v instanceof java.util.Date) {
        finalObj = new java.sql.Date(((java.util.Date) v).getTime());
        finalType = PDate.INSTANCE;
      } else {
        finalObj = v;
        finalType = c.getPDataType();
      }

      if (finalObj instanceof Object[]) {
        setArrayInStatement(statement, finalType, (Object[]) finalObj, i + 1);
      } else if (finalObj instanceof byte[]) {
        // PVarbinary and PBinary are provided as byte[] but are treated as SQL objects
        if (PDataType.equalsAny(finalType, PVarbinary.INSTANCE, PBinary.INSTANCE)) {
          statement.setObject(i + 1, finalObj);
        } else {
          // otherwise set as array type
          setArrayInStatement(statement, finalType, primativeArrayToObjectArray((byte[]) finalObj),
            i + 1);
        }
      } else if (finalObj instanceof short[]) {
        setArrayInStatement(statement, finalType, primativeArrayToObjectArray((short[]) finalObj),
          i + 1);
      } else if (finalObj instanceof int[]) {
        setArrayInStatement(statement, finalType, primativeArrayToObjectArray((int[]) finalObj),
          i + 1);
      } else if (finalObj instanceof long[]) {
        setArrayInStatement(statement, finalType, primativeArrayToObjectArray((long[]) finalObj),
          i + 1);
      } else if (finalObj instanceof float[]) {
        setArrayInStatement(statement, finalType, primativeArrayToObjectArray((float[]) finalObj),
          i + 1);
      } else if (finalObj instanceof double[]) {
        setArrayInStatement(statement, finalType, primativeArrayToObjectArray((double[]) finalObj),
          i + 1);
      } else {
        statement.setObject(i + 1, finalObj);
      }
    }
  }

  @Override
  public void readFields(ResultSet resultSet) throws SQLException {
    ResultSetMetaData metaData = resultSet.getMetaData();
    for (int i = 1; i <= metaData.getColumnCount(); i++) {
      // return the contents of a PhoenixArray, if necessary
      Object value = resultSet.getObject(i);
      // put a (ColumnLabel -> value) entry into the result map
      resultMap.put(metaData.getColumnLabel(i), value);
    }
  }

  /** Append an object to the list of values to upsert. */
  public void add(Object value) {
    upsertValues.add(value);
  }

  /** Returns an immutable view on the {@link ResultSet} content. */
  public Map<String, Object> getResultMap() {
    return Collections.unmodifiableMap(resultMap);
  }
}
